package org.zjvis.datascience.common.util;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.zjvis.datascience.common.dto.TaskDTO;
import org.zjvis.datascience.common.enums.TaskTypeEnum;
import org.zjvis.datascience.common.vo.TaskVO;

import java.util.*;

/**
 * @description Task功能类，目前主要是辅助进行频繁的任务名字修改操作
 * @date 2021-11-15
 */
@Data
@Component
public class TaskUtil {

    private final static Logger logger = LoggerFactory.getLogger("TaskUtil");

    // private final Lock lock;

    private final String PATTERN = ".*\\(\\d{1,}\\)";

    // 每一个pipeline task节点名字词典
    private Map<Long, Map<String, Integer>> nodeNameMap = Maps.newConcurrentMap();

    // 维护每个pipeline task节点信息
    private Map<Long, Map<Long, TaskDTO>> nodeMap = Maps.newConcurrentMap();

    // 复制阶段保留 新老节点的对应关系
    private Map<Long, Map<Long, Long>> relationMap = Maps.newConcurrentMap();

    public void clear(Long pipelineId) {
        clear(pipelineId, false);
    }

    public void clear(Long pipelineId, boolean cleanAll) {
        synchronized (this) {
            nodeNameMap.remove(pipelineId);
            nodeMap.remove(pipelineId);
            if (cleanAll) {
                relationMap.remove(pipelineId);
            }
        }
    }

    /**
     * 是否包含指定pipeline
     *
     * @param pipelineId
     * @return
     */
    public boolean contain(Long pipelineId) {
        return nodeNameMap.containsKey(pipelineId) && this.containPipeline(pipelineId);
    }

    public boolean containPipeline(Long pipelineId) {
        return nodeMap.containsKey(pipelineId);
    }

    public Pair<String, Integer> parse(String name) {
        if (name.matches(PATTERN)) {
            int index = name.lastIndexOf("(");
            int index2 = name.lastIndexOf(")");
            return Pair.of(name.substring(0, index), Integer.parseInt(name.substring(index + 1, index2)));
        }
        return Pair.of(name, 0);
    }

    public void addPipeline(Long pipelineId, List<TaskDTO> lists) {
//        lock.lock();
        synchronized (this) {
            Map<String, Integer> map = new HashMap<>();
            for (TaskDTO dto : lists) {
                String name = dto.getName();
                Pair<String, Integer> pair = this.parse(name);
                if (map.containsKey(pair.getKey())) {
                    if (map.get(pair.getKey()) < pair.getValue()) {
                        map.put(pair.getKey(), pair.getValue());
                    }
                } else {
                    map.put(pair.getKey(), pair.getValue());
                }
            }
            nodeNameMap.put(pipelineId, map);
        }

//        lock.unlock();
    }

    public void addPipelineTask(Long pipelineId, List<TaskDTO> lists) {
        synchronized (this) {
            Map<Long, TaskDTO> map = new HashMap<>();
            for (TaskDTO dto : lists) {
                map.put(dto.getId(), dto);
            }
            nodeMap.put(pipelineId, map);
        }
    }

    public void updateTaskNode(Long pipelineId, TaskDTO task, String opType) {
        synchronized (this) {
            Map<Long, TaskDTO> map = nodeMap.get(pipelineId);
            switch (opType) {
                case "copy":
                case "add":
                case "update": {
                    map.put(task.getId(), task);
                    break;
                }
                case "delete": {
                    map.remove(task.getId());
                    break;
                }
                default:
                    break;
            }
            nodeMap.put(pipelineId, map);
        }
    }

    private void getRoadMap(Map<Long, TaskDTO> map, Map<Pair<Integer, Integer>, Integer> roadMap) {
        for (Map.Entry<Long, TaskDTO> entry : map.entrySet()) {
            TaskVO vo = entry.getValue().view();
            JSONObject json = vo.getData().getJSONObject("position");
            Pair<Integer, Integer> pair = Pair.of(json.getInteger("row"), json.getInteger("col"));
            if (roadMap.containsKey(pair)) {
                roadMap.put(pair, roadMap.get(pair) + 1);
            } else {
                roadMap.put(pair, 1);
            }
        }
    }

    private void recursionMovePosition(TaskDTO current, List<TaskDTO> nodes, Map<Long, TaskDTO> copyMap, Map<Pair<Integer, Integer>, Integer> roadMap, boolean isForward, int step) {
        if (current == null) {
            return;
        }
        String idsStr = isForward ? current.getChildId() : current.getParentId();

        if (StringUtils.isNotEmpty(idsStr)) {
            String[] ids = idsStr.split(",");
            for (String id : ids) {
                recursionMovePosition(copyMap.get(Long.parseLong(id)), nodes, copyMap, roadMap, isForward, step);
            }
        }
        JSONObject data = current.view().getData();
        JSONObject currentPosition = data.getJSONObject("position");
        Pair<Integer, Integer> currentPoi = Pair.of(currentPosition.getInteger("row"), currentPosition.getInteger("col"));
        Pair<Integer, Integer> newCurrentPoi = this.findSpacePosition(currentPoi, roadMap, isForward, step);
        roadMap.put(currentPoi, roadMap.get(currentPoi) - 1);
        roadMap.put(newCurrentPoi, 1);
        currentPosition.put("row", newCurrentPoi.getKey());
        currentPosition.put("col", newCurrentPoi.getValue());
        data.put("position", currentPosition);
        current.setDataJson(JSONObject.toJSONString(data));
        copyMap.put(current.getId(), current);
        nodes.add(current);
    }

    private Pair<Integer, Integer> findSpacePosition(Pair<Integer, Integer> current, Map<Pair<Integer, Integer>, Integer> roadMap, boolean isForward) {
        return findSpacePosition(current, roadMap, isForward, 1);
    }

    private Pair<Integer, Integer> findSpacePosition(Pair<Integer, Integer> current, Map<Pair<Integer, Integer>, Integer> roadMap, boolean isForward, int step) {
        int row = current.getKey();
        int col = isForward ? current.getValue() + step : current.getValue() - step;
        // row = Math.max(1, row - 3);
        for (; row < 1000; ++row) {
            Pair<Integer, Integer> pair = Pair.of(row, col);
            if (!roadMap.containsKey(pair) || roadMap.get(pair) == 0) {
                return pair;
            }
        }
        return null;
    }

    /**
     * 连通图节点个数统计
     * isForward:
     * true 前向查找
     * false 后向查找
     *
     * @param pipelineId
     * @param currentId  当前节点
     * @param isForward
     * @return
     */
    public int getCountOfConnectedGraph(Long pipelineId, Long currentId, boolean isForward, List<Long> flag, int step) {
        int count = 0;
        Map<Long, TaskDTO> map = nodeMap.get(pipelineId);
        Queue<Long> queue = new ArrayDeque<>();
        queue.offer(currentId);
        while (!queue.isEmpty()) {
            Long id = queue.poll();
            ++count;
            TaskDTO dto = map.get(id);
            if (dto == null) {
                continue;
            }
            if (!isForward) {
                JSONObject data = dto.view().getData().getJSONObject("position");
                if (data.getInteger("col") - step < 1) {
                    flag.add(id);
                }
            }
            String ids = "";
            if (isForward) {
                ids = dto.getChildId();
            } else {
                ids = dto.getParentId();
            }
            if (StringUtils.isNotEmpty(ids)) {
                String[] tmps = ids.split(",");
                for (String e : tmps) {
                    queue.offer(Long.parseLong(e));
                }
            }
        }
        return count;
    }

    public List<TaskDTO> getNodeWithPositionModified(Long pipelineId, Long referParentId, Long referChildId) {
        List<TaskDTO> nodes = new ArrayList<>();
        synchronized (this) {
            Map<Pair<Integer, Integer>, Integer> roadMap = new HashMap<>();
            Map<Long, TaskDTO> map = nodeMap.get(pipelineId);
            getRoadMap(map, roadMap);
            Map<Long, TaskDTO> copyMap = new HashMap<>();
            copyMap.putAll(map);
            List<Long> flagList = new ArrayList<>();
            TaskDTO parentNode = map.get(referParentId);
            TaskDTO childNode = map.get(referChildId);
            JSONObject parentPoi = parentNode.view().getData().getJSONObject("position");
            JSONObject childPoi = childNode.view().getData().getJSONObject("position");
            int step = parentPoi.getInteger("col") - childPoi.getInteger("col") + 1;
            int countBack = this.getCountOfConnectedGraph(pipelineId, referParentId, false, flagList, step);
            int countForward = this.getCountOfConnectedGraph(pipelineId, referChildId, true, flagList, step);
            boolean isForward = false;
            if (flagList.size() > 0 || countBack > countForward) {
                isForward = true;
            }
            if (isForward) {
                this.recursionMovePosition(childNode, nodes, copyMap, roadMap, true, step);
            } else {
                this.recursionMovePosition(parentNode, nodes, copyMap, roadMap, false, step);
            }
        }
        return nodes;
    }

    /**
     * 根据当前节点修改需要调整position的节点，
     * 写时候复制思想，只有List<TaskDTO>更新数据库成功后，配合updateNodeMapForSuccess，更新nodeMap数据结构
     *
     * @param pipelineId
     * @param task
     * @return
     */
    public List<TaskDTO> getNodeWithPositionModified(Long pipelineId, TaskDTO task, String parentId, String referChildId) {
        List<TaskDTO> nodes = new ArrayList<>();
        String childId = task.getChildId();
        if (StringUtils.isEmpty(childId)) {
            // 前端考虑
            return nodes;
        }
        synchronized (this) {
            Map<Pair<Integer, Integer>, Integer> roadMap = new HashMap<>();
            Map<Long, TaskDTO> map = nodeMap.get(pipelineId);
            task = map.get(task.getId());
            getRoadMap(map, roadMap);
            Map<Long, TaskDTO> copyMap = new HashMap<>();
            copyMap.putAll(map);
            if (StringUtils.isNotEmpty(parentId)) {
                TaskDTO parentNode = copyMap.get(Long.parseLong(parentId));
                JSONObject parentPoi = parentNode.view().getData().getJSONObject("position");
                JSONObject currentData = task.view().getData();
                JSONObject currentPoi = null;
                if (!currentData.containsKey("position")) {
                    currentPoi = new JSONObject();
                    currentPoi.put("col", 0);
                    currentPoi.put("row", 0);
                } else {
                    currentPoi = currentData.getJSONObject("position");
                }
                Pair<Integer, Integer> parentPair = Pair.of(parentPoi.getInteger("row"), parentPoi.getInteger("col"));
                Pair<Integer, Integer> currentPair = Pair.of(currentPoi.getInteger("row"), currentPoi.getInteger("col"));
                currentData.put("position", parentPoi);
                task.setDataJson(JSONObject.toJSONString(currentData));
                copyMap.put(task.getId(), task);
                roadMap.put(currentPair, roadMap.get(currentPair) - 1);
                roadMap.put(parentPair, roadMap.get(parentPair) + 1);
                this.recursionMovePosition(task, nodes, copyMap, roadMap, true, 1);
            } else {
                TaskDTO childDto = copyMap.get(Long.parseLong(referChildId));
                JSONObject currentData = task.view().getData();
                JSONObject currentPoi = null;
                if (!currentData.containsKey("position")) {
                    currentPoi = new JSONObject();
                    currentPoi.put("col", 0);
                    currentPoi.put("row", 0);
                } else {
                    currentPoi = currentData.getJSONObject("position");
                }
                Pair<Integer, Integer> currentPair = Pair.of(currentPoi.getInteger("row"), currentPoi.getInteger("col"));
                roadMap.put(currentPair, roadMap.get(currentPair) - 1);

                JSONObject childPoi = childDto.view().getData().getJSONObject("position");
                Pair<Integer, Integer> newPair = Pair.of(childPoi.getInteger("row"), childPoi.getInteger("col") - 1);

                roadMap.put(newPair, roadMap.containsKey(newPair) ? roadMap.get(newPair) + 1 : 1);

                currentPoi.put("row", newPair.getKey());
                currentPoi.put("col", newPair.getValue());
                currentData.put("position", currentPoi);
                task.setDataJson(JSONObject.toJSONString(currentData));
                copyMap.put(task.getId(), task);
                this.recursionMovePosition(task, nodes, copyMap, roadMap, true, 1);
            }
        }

        return nodes;
    }

    public Pair<Integer, Integer> getPositionForTask(TaskDTO task) {
        JSONObject position = task.view().getData().getJSONObject("position");
        return Pair.of(position.getInteger("row"), position.getInteger("col"));
    }

    /**
     * 为join或者union找一个空的位置并更新
     *
     * @param pipelineId
     * @param task
     */
    public void updatePositionForUnionOrJoinNode(Long pipelineId, TaskDTO task) {
        synchronized (this) {
            String parentId = task.getParentId();
            Map<Long, TaskDTO> map = nodeMap.get(pipelineId);
            Map<Pair<Integer, Integer>, Integer> roadMap = new HashMap<>();
            getRoadMap(map, roadMap);
            String[] ids = parentId.split(",");
            Pair<Integer, Integer> position1 = this.getPositionForTask(map.get(Long.parseLong(ids[0])));
            Pair<Integer, Integer> position2 = this.getPositionForTask(map.get(Long.parseLong(ids[1])));
            int minRow = Math.min(position1.getKey(), position2.getKey());
            int maxCol = Math.max(position1.getValue(), position2.getValue());
            boolean isFind = false;
            Pair<Integer, Integer> newPos = position1;
            for (int i = maxCol + 1; !isFind && i < 1000; ++i) {
                for (int j = minRow; !isFind && j < 1000; ++j) {
                    Pair<Integer, Integer> pair = Pair.of(j, i);
                    if (!roadMap.containsKey(pair) || roadMap.get(pair) == 0) {
                        newPos = pair;
                        isFind = true;
                    }
                }
            }
            JSONObject position = new JSONObject();
            position.put("row", newPos.getKey());
            position.put("col", newPos.getValue());
            JSONObject data = task.view().getData();
            data.put("position", position);
            task.setDataJson(JSONObject.toJSONString(data));
        }
    }

    public static boolean isDataNode(TaskDTO task) {
        return TaskTypeEnum.TASK_TYPE_DATA.getVal().equals(task.getType());
    }

    public static boolean isThisTypeNode(TaskDTO task, TaskTypeEnum type) {
        return type.getVal().equals(task.getType());
    }

    /**
     * 根据brother节点为task找个合适位置并更新
     *
     * @param pipelineId
     * @param brotherId
     * @param task
     * @return
     */
    public void updatePositionDragAddForJoinOrUnion(Long pipelineId, Long brotherId, TaskDTO task) {
        synchronized (this) {
            Map<Long, TaskDTO> map = nodeMap.get(pipelineId);
            Map<Pair<Integer, Integer>, Integer> roadMap = new HashMap<>();
            getRoadMap(map, roadMap);
            TaskDTO brotherNode = map.get(brotherId);
            Pair<Integer, Integer> pos = this.getPositionForTask(brotherNode);
            int row = pos.getKey() + 1;
            int col = pos.getValue();
            Pair<Integer, Integer> newPos = pos;
            for (; row < 1000; ++row) {
                Pair<Integer, Integer> pair = Pair.of(row, col);
                if (!roadMap.containsKey(pair) || roadMap.get(pair) == 0) {
                    newPos = pair;
                    break;
                }
            }
            JSONObject position = new JSONObject();
            position.put("row", newPos.getKey());
            position.put("col", newPos.getValue());
            JSONObject data = task.view().getData();
            data.put("position", position);
            task.setDataJson(JSONObject.toJSONString(data));
        }
    }

    /**
     * 更新nodeMap
     *
     * @param nodes
     */
    public void updateNodeMapForSuccess(Long pipelineId, List<TaskDTO> nodes) {
        synchronized (this) {
            Map<Long, TaskDTO> map = nodeMap.get(pipelineId);
            for (TaskDTO node : nodes) {
                map.put(node.getId(), node);
            }
            nodeMap.put(pipelineId, map);
        }
    }

    public void add(Long pipelineId, TaskDTO task) {
        this.modifyTaskName(pipelineId, task, false);
    }


    public void delete(Long pipelineId, TaskDTO task) {
//        lock.lock();
        synchronized (this) {
            String name = task.getName();
            Map<String, Integer> map = nodeNameMap.get(pipelineId);
            if (map.containsKey(name)) {
                map.remove(name);
                nodeNameMap.put(pipelineId, map);
            } else {
                Pair<String, Integer> pair = this.parse(name);
                if (map.containsKey(pair.getKey())) {
                    if (map.get(pair.getKey()) > pair.getValue()) {

                    } else if (map.get(pair.getKey()) == pair.getValue() && pair.getValue() != 0) {
                        map.put(pair.getKey(), pair.getValue() - 1);
                    } else if (map.get(pair.getKey()) == pair.getValue() && pair.getValue() == 0) {
                        map.remove(pair.getKey());
                    }
                    nodeNameMap.put(pipelineId, map);
                }
            }
        }
//        lock.unlock();
    }

    public void update(Long pipelineId, TaskDTO task) {
//        lock.lock();
        synchronized (this) {
            Map<String, Integer> map = nodeNameMap.get(pipelineId);
            if (map != null && !map.containsKey(task.getName())) {
                map.put(task.getName(), 0);
                nodeNameMap.put(pipelineId, map);
            }
        }
//        lock.unlock();
    }

    public void copy(Long pipelineId, TaskDTO task) {
        this.modifyTaskName(pipelineId, task, true);
    }

    public void modifyTaskName(Long pipelineId, TaskDTO task, boolean isCopy) {
//        lock.lock();
        synchronized (this) {
            Map<String, Integer> map = nodeNameMap.get(pipelineId);
            String name = task.getName();
            if (map.containsKey(name)) {
                int currentMaxIndex = map.get(name);
                map.put(name, currentMaxIndex + 1);
                task.setName(String.format("%s(%s)", name, currentMaxIndex + 1));
            } else {
                if (isCopy) {
                    task.setName(String.format("%s(%s)", name, 1));
                    map.put(name, 1);
                } else {
                    map.put(name, 0);
                }
            }
            nodeNameMap.put(pipelineId, map);
        }
//        lock.unlock();
    }

    /**
     * 解析task节点的 datajson 提取出表名
     *
     * @param taskDTO
     * @return
     */
    public static String extractTableStr(TaskDTO taskDTO) {
        JSONObject data = JSONObject.parseObject(taskDTO.getDataJson());
        JSONArray output = data.getJSONArray("output");
        if (output.size() > 0) {
            JSONObject item = output.getJSONObject(0);
            String outTableName = item.getString("tableName");
            if (taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_MODEL.getVal())) {
                if (!outTableName.contains(".")) {
                    outTableName = "dataset." + ToolUtil.alignTableName(outTableName, 0L);
                }
            } else if (taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_DATA.getVal()) ||
                    taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal()) ||
                    taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_ALGOPY.getVal())) {
                outTableName = ToolUtil.alignTableName(outTableName, 0L);
            } else if (taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_ALGO.getVal()) ||
                    taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal())) {
                outTableName = ToolUtil.alignTableName(outTableName, data.getLongValue("lastTimeStamp"));
            }
            return outTableName;
        } else {
            return StringUtils.EMPTY;
        }
    }

    public Map<Long, Long> getRelationShipByPipelineId(Long pipelineId) {
        return relationMap.get(pipelineId);
    }
}